/*###################################################################
  > File Name: storage/replica/replica_cleanup.c
  > Author: Vurtune
  > Mail: vurtune@foxmail.com
  > Created Time: Mon 09 Oct 2017 07:26:38 PM PDT
###################################################################*/
#include "config.h"

#include <time.h>
#include <sys/mman.h>
#include <libaio.h>
#include <sys/types.h>
#include <utime.h>
#include <errno.h>
#include <time.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "replica.h"
#include "job_dock.h"
#include "sysy_lib.h"
#include "core.h"
#include "cluster.h"
#include "disk.h"
#include "../storage/md_parent.h"
#include "lich_md.h"
#include "squeue.h"
#include "disk_maping.h"
#include "timer.h"
#include "net_global.h"
#include "bmap.h"
#include "sequence.h"
#include "dbg.h"
#include "node.h"
#include "diskmd.h"
#include "disk_redis.h"
#include "yid.h"
#include "bh_task.h"
#include "cleanup_offline_msg.h"

/**
 * @file 节点内删除卷的数据
 *
 * 任务登记在etcd的lich4/cleanup目录
 */

// TODO
#define __DB_HASH__     10
#define CLEANUP_CHUNKS  100
#define CLEANUP_RECYCLE (USEC_PER_SEC * 30)

typedef struct {
        volid_t id;
        int clean_md;
        struct list_head raw_list[__DB_HASH__];
        uint64_t chknum;
} replica_cleanup_t;

typedef struct {
        struct list_head hook;
        chkid_t id;
        diskloc_t loc;
        chkid_t parent;
        int wbdisk;
} replica_t;

typedef enum {
        CLEANUP_WAITING = 0,
        CLEANUP_RUNNING,
        CLEANUP_DONE,
} status_t;

typedef struct {
        chkid_t volid;
        status_t status;
        BOOL clean_md;
} cleanup_entry_t;

typedef struct {
        sem_t sem;
        sy_rwlock_t lock;
        msgqueue_t msgqueue;
        hashtable_t cleanup_table;
        bh_task_t replica_cleanup_bh_task;
} replica_cleanup_bh_t;

static replica_cleanup_bh_t __cleanup_bh__;
static worker_handler_t desc_handler;

#if DISK_MAPING_ITERATOR_NEW

static int __replica_srv_cleanup_scan(const chkid_t *chkid, const char *_pool,
                                       const diskloc_t *loc, const chkid_t *parent,
                                       const uint64_t meta_version, void *arg)
{
        int ret;
        replica_t *replica;
        replica_cleanup_t *cleanup_handle = arg;
        struct list_head *list = &cleanup_handle->raw_list[0];

        (void) meta_version;

        if (!cleanup_handle->clean_md && chkid->type != __RAW_CHUNK__) {
                return 0;
        }
        
        ret = ymalloc((void **)&replica, sizeof(replica_t));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        replica->id = *chkid;
        replica->loc = *loc;
        replica->parent = *parent;

        list_add_tail(&replica->hook, list);

        cleanup_handle->chknum++;

        return 0;
}


#else

static void __replica_srv_cleanup_scan_raw_redis(void *key, void *value, void *arg)
{
        int ret;
        replica_t *replica;
        replica_cleanup_t *cleanup_handle = arg;

        struct list_head *list = &cleanup_handle->raw_list[0];

        ret = ymalloc((void **)&replica, sizeof(replica_t));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        disk_redis_key(key, &replica->id);
        disk_redis_decode(value, NULL, &replica->loc, &replica->parent, NULL);

        list_add_tail(&replica->hook, list);

        cleanup_handle->chknum++;
}

#endif

static int __replica_srv_cleanup_scan_raw_sqlite3(va_list ap)
{
        int ret;
        replica_t *replica;

        replica_cleanup_t *cleanup_handle = va_arg(ap, replica_cleanup_t *);
        int idx = va_arg(ap, int);
        chkid_t *chkid = va_arg(ap, chkid_t *);
        uint32_t diskid = va_arg(ap, uint32_t);
        uint32_t offset = va_arg(ap, uint32_t);
        chkid_t *parent = va_arg(ap, chkid_t *);
        const char *pool = va_arg(ap, const char *);
        uint64_t meta_version = va_arg(ap, uint64_t);

        (void) pool;
        (void) meta_version;
        
        va_end(ap);

        struct list_head *list = &cleanup_handle->raw_list[idx];

        ret = ymalloc((void **)&replica, sizeof(replica_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        replica->id = *chkid;
        replica->loc.diskid = diskid;
        replica->loc.idx = offset;
        replica->parent = *parent;
        replica->wbdisk = -1;

        list_add_tail(&replica->hook, list);

        cleanup_handle->chknum++;

        return 0;
err_ret:
        return ret;
}

static int __replica_srv_cleanup_scan_md(va_list ap)
{
        replica_cleanup_t *cleanup_handle = va_arg(ap, replica_cleanup_t *);
        int idx = va_arg(ap, int);
        chkid_t *chkid = va_arg(ap, chkid_t *);
        uint32_t diskid = va_arg(ap, uint32_t);
        uint32_t offset = va_arg(ap, uint32_t);
        chkid_t *parent = va_arg(ap, chkid_t *);
        const char *pool = va_arg(ap, const char *);
        uint64_t meta_version = va_arg(ap, uint64_t);

        (void) pool;
        (void) meta_version;
        
        va_end(ap);

        int ret;
        replica_t *replica;

        // not vol chk
        // i.e. is a snapshot, or is a cloned volume
        // TODO volume's parent should be its pool
        if (0 == chkid_cmp(&cleanup_handle->id, parent)) {
                if (chkid->type != __VOLUME_SUB_CHUNK__)
                        goto out;
        }

        struct list_head *list = &cleanup_handle->raw_list[idx];

        ret = ymalloc((void **)&replica, sizeof(replica_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        replica->id = *chkid;
        replica->loc.diskid = diskid;
        replica->loc.idx = offset;
        replica->parent = *parent;
        replica->wbdisk = -1;

        list_add_tail(&replica->hook, list);

        cleanup_handle->chknum++;
out:
        return 0;
err_ret:
        return ret;
}

static int __replica_srv_cleanup_clock__(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, chkid_t *);

        ret = clock_remove(chkid);
        if (unlikely(ret)) {
                if (ret == ENOKEY || ret == ENOENT) {
                        //如果没有，可能是dirty
                } else {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

static int __replica_srv_cleanup_clock(const struct list_head *list, int count)
{
        int ret;
        struct list_head *pos, *n;
        replica_t *replica;

        (void)count;
        list_for_each_safe(pos, n, list) {
                replica = (replica_t *)pos;
                ret = core_request(core_hash(&replica->id), -1, "clock_remove",
                                   __replica_srv_cleanup_clock__, &replica->id);
                if (unlikely(ret))
                        DWARN("remove "CHKID_FORMAT" clock ret %d\n",
                                        CHKID_ARG(&replica->id), ret);
        }

        return 0;
}

static int __replica_srv_cleanup_db(const struct list_head *list, int count)
{
        int ret, i = 0;
        replica_t *replica;
        struct list_head *pos, *n;

        chkid_t *chkid_array;
        diskloc_t *loc_array;

        ret = ymalloc((void **)&chkid_array, count * sizeof(chkid_t));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = ymalloc((void **)&loc_array, count * sizeof(diskloc_t));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        i = 0;
        list_for_each_safe(pos, n, list) {
                replica = (replica_t *)pos;
                chkid_array[i] = replica->id;
                loc_array[i] = replica->loc;
                i++;
        }

        YASSERT(i == count);

        ret = disk_maping->del(chkid_array, loc_array, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        yfree((void **)&chkid_array);
        yfree((void **)&loc_array);
        return 0;
err_ret:
        yfree((void **)&chkid_array);
        yfree((void **)&loc_array);
        return ret;
}

static int __replica_srv_cleanup_diskmd(struct list_head *list, int count)
{
        int ret;
        replica_t *replica;
        struct list_head *pos, *n;

        (void) count;

        list_for_each_safe(pos, n, list) {
                replica = (replica_t *)pos;
                DBUG("delete "CHKID_FORMAT" loc "LOC_FORMAT"\n", CHKID_ARG(&replica->id), LOC_ARG(&replica->loc));

                ret = diskmd_delete(&replica->loc);
                if (unlikely(ret)) {
                        if (ret == ENODEV) {
                        } else {
                                DERROR("cleanup diskmd error %d! chkid "CHKID_FORMAT" disk %u off %u while lost\n",
                                                ret, CHKID_ARG(&replica->id), replica->loc.diskid, replica->loc.idx);
                                SERROR(0, "cleanup diskmd error %d! chkid "CHKID_FORMAT" disk %u off %u while lost\n",
                                                ret, CHKID_ARG(&replica->id), replica->loc.diskid, replica->loc.idx);
                                //BUG
                                //在测试机器上出现过数据库和bitmap记录不一致，机器根分区占满过
                                YASSERT(0);
                        }
                }
        }

        list_for_each_safe(pos, n, list) {
                replica = (replica_t *)pos;
                list_del(&replica->hook);
                yfree((void **)&replica);
        }

        return 0;
}

/**
 * 这个函数可以说是批量调用 disk_unlink
 *
 * @Param list
 * @Param count
 *
 * @Returns
 */
static int __replica_srv_cleanup__(struct list_head *list, int count)
{
        int ret;
        struct list_head *pos, *n;
        replica_t *replica;

        ret = __replica_srv_cleanup_clock(list, count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = __replica_srv_cleanup_db(list, count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = __replica_srv_cleanup_diskmd(list, count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        list_for_each_safe(pos, n, list) {
                replica = (replica_t *)pos;
                list_del(&replica->hook);
                yfree((void **)&replica);
        }
        return ret;
}
#if DISK_MAPING_ITERATOR_NEW
static int __replica_srv_cleanup(const chkid_t *chkid, int clean_md)
#else
static int __replica_srv_cleanup_redis(const chkid_t *chkid, int clean_md)
#endif
{
        int ret;
        uint64_t total = 0;
        replica_t *replica;
        struct list_head *pos, *n;
        replica_cleanup_t cleanup_handle;

        DINFO("replica cleanup "CHKID_FORMAT"\n", CHKID_ARG(chkid));

        ANALYSIS_BEGIN(0);

        cleanup_handle.id = *chkid;
        INIT_LIST_HEAD(&cleanup_handle.raw_list[0]);

        DINFO("cleanup database\n");

        cleanup_handle.chknum = 0;
        cleanup_handle.clean_md = clean_md;
#if DISK_MAPING_ITERATOR_NEW
        disk_maping->iterator_byvol(chkid, __replica_srv_cleanup_scan, &cleanup_handle);
#else        
        disk_redis_iterator_byvol(chkid, __replica_srv_cleanup_scan_raw_redis, &cleanup_handle);
#endif
        if  (list_empty(&cleanup_handle.raw_list[0])) {
                DINFO("cleanup database\n");
                goto out;
        }

        total += cleanup_handle.chknum;

        ret = __replica_srv_cleanup__(&cleanup_handle.raw_list[0], cleanup_handle.chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(list_empty(&cleanup_handle.raw_list[0]));

out:

        ANALYSIS_END(0, IO_WARN, "replica cleanup");

        DINFO("replica cleanuped "CHKID_FORMAT" repnum %ju\n", CHKID_ARG(chkid), total);

        return 0;
err_ret:
        list_for_each_safe(pos, n, &cleanup_handle.raw_list[0]) {
                replica = (replica_t *)pos;
                list_del(pos);
                yfree((void **)&replica);
        }

        return ret;
}

#if DISK_MAPING_ITERATOR_NEW
#else

static int __replica_srv_cleanup_sqlite3(const chkid_t *chkid, int clean_md)
{
        int i, ret;
        uint64_t total = 0;
        replica_t *replica;
        struct list_head *pos, *n;
        replica_cleanup_t cleanup_handle;
        char _parent[MAX_BUF_LEN], raw_cdt[MAX_NAME_LEN], md_cdt[MAX_NAME_LEN];

        DWARN("replica cleanup "CHKID_FORMAT"\n", CHKID_ARG(chkid));

        ANALYSIS_BEGIN(0);

        cleanup_handle.id = *chkid;
        for(i=0; i< __DB_HASH__; i++) {
                INIT_LIST_HEAD(&cleanup_handle.raw_list[i]);
        }

        base64_encode((void *)chkid, sizeof(*chkid), _parent);

        //raw
        snprintf(raw_cdt, MAX_NAME_LEN, "parent='%s' limit %d", _parent, CLEANUP_CHUNKS);

        //metadata
        snprintf(md_cdt, MAX_NAME_LEN, "key='%s' or parent='%s'", _parent, _parent);

        for (i=0; i < __DB_HASH__; i++) {

                DINFO("cleanup database %d\n", i);

                while(1) {
                        cleanup_handle.chknum = 0;
                        diskmd_chunk_iterator_cursor("raw", i, raw_cdt, __replica_srv_cleanup_scan_raw_sqlite3, &cleanup_handle);
                        if  (list_empty(&cleanup_handle.raw_list[i])) {
                                DINFO("cleanup database %d.raw finish\n", i);
                                break;
                        }

                        total += cleanup_handle.chknum;

                        ret = __replica_srv_cleanup__(&cleanup_handle.raw_list[i], cleanup_handle.chknum);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        YASSERT(list_empty(&cleanup_handle.raw_list[i]));
                }

                if (clean_md) {
                        while(1) {
                                cleanup_handle.chknum = 0;
                                diskmd_chunk_iterator_cursor("metadata", i, md_cdt, __replica_srv_cleanup_scan_md, &cleanup_handle);
                                if  (list_empty(&cleanup_handle.raw_list[i])) {
                                        DINFO("cleanup database %d.md finish\n", i);
                                        break;
                                }

                                total += cleanup_handle.chknum;

                                ret = __replica_srv_cleanup__(&cleanup_handle.raw_list[i], cleanup_handle.chknum);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                YASSERT(list_empty(&cleanup_handle.raw_list[i]));
                        }
                }
        }

        ANALYSIS_END(0, IO_WARN, "replica cleanup");

        DWARN("replica cleanuped "CHKID_FORMAT" repnum %ju\n", CHKID_ARG(chkid), total);

        return 0;
err_ret:
        for(i=0; i<__DB_HASH__; i++) {
                list_for_each_safe(pos, n, &cleanup_handle.raw_list[i]) {
                        replica = (replica_t *)pos;
                        list_del(pos);
                        yfree((void **)&replica);
                }
        }

        return ret;
}
#endif

static uint32_t __key_func(const void *k)
{
        const uint64_t *id = k;
        return (*id) % 10;
}

static int __cmp_func(const void *s1, const void *s2)
{
        const cleanup_entry_t *e1 = s1;
        const uint64_t *volid = s2;
        if( e1->volid.id < *volid)
                return -1;
        else if (e1->volid.id > *volid)
                return 1;
        return 0;
}

static int __iterator_func(void *_ctx, void *_ent)
{
        cleanup_entry_t *ent = _ent, **ptr = _ctx;

        if (ent->status == CLEANUP_WAITING) {
                *ptr = ent;
                goto stop;
        }

        return 0;
stop:
        return 1;
}

static int __replica_cleanup_create(const chkid_t *volid, int clean_md)
{
        int ret;
        cleanup_entry_t *ent = NULL;

        ent = hash_table_find(__cleanup_bh__.cleanup_table, &volid->id);
        if (unlikely(ent)) {
                if (ent->status == CLEANUP_WAITING)
                        return EEXIST;
                else if (ent->status == CLEANUP_RUNNING)
                        return EAGAIN;
                else if (ent->status == CLEANUP_DONE)
                        return 0;
                else
                        YASSERT(0);
        } else {
                ret = ymalloc((void **)&ent, sizeof(cleanup_entry_t));
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ent->volid = *volid;
                ent->status = CLEANUP_WAITING;
                ent->clean_md = clean_md;

                ret = hash_table_insert(__cleanup_bh__.cleanup_table, (void *)ent, &ent->volid.id, 0);
                if (unlikely(ret)) {
                        YASSERT(0);
                } else {
                        sem_post(&__cleanup_bh__.sem);
                        return EAGAIN;
                }
        }

        return 0;
err_ret:
        return ret;
}

int replica_cleanup_remove()
{
        return 0;
}

int replica_srv_cleanup(const chkid_t *chkid, int clean_md)
{
        int ret;

        ret = sy_rwlock_wrlock(&__cleanup_bh__.lock);
        if (unlikely(ret))
                GOTO(err_ret ,ret);

        ret = __replica_cleanup_create(chkid, clean_md);
        if (unlikely(ret))
                GOTO(err_unlock, ret);

        sy_rwlock_unlock(&__cleanup_bh__.lock);

        return 0;
err_unlock:
        sy_rwlock_unlock(&__cleanup_bh__.lock);
err_ret:
        return ret;
}

static void *__replica_cleanup_worker__(void *arg)
{
        int ret;
        cleanup_entry_t *ent;

        (void) arg;

        DINFO("replica cleanup worker start\n");

        while(1) {
                ret = _sem_timedwait1(&__cleanup_bh__.sem, CLEANUP_RECYCLE / USEC_PER_SEC);
                if (unlikely(ret)) {
                        if (ret == ETIMEDOUT) {
                        } else
                                UNIMPLEMENTED(__DUMP__);
                }

                ent = NULL;

                ret = sy_rwlock_wrlock(&__cleanup_bh__.lock);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                hash_iterate_table_entries(__cleanup_bh__.cleanup_table, __iterator_func, &ent);

                sy_rwlock_unlock(&__cleanup_bh__.lock);

                //need cleanup
                if (ent) {
                        DWARN("cleanup replicas "CHKID_FORMAT"\n", CHKID_ARG(&ent->volid));

                        YASSERT(ent->status == CLEANUP_WAITING);

                        ent->status = CLEANUP_RUNNING;

#if DISK_MAPING_ITERATOR_NEW
                        ret = __replica_srv_cleanup(&ent->volid, ent->clean_md);
#else
                        if (gloconf.kv_redis) {
                                ret = __replica_srv_cleanup_redis(&ent->volid, ent->clean_md);
                        } else {
                                ret = __replica_srv_cleanup_sqlite3(&ent->volid, ent->clean_md);
                        }
#endif
                        if (unlikely(ret)) {
                                YASSERT(0);
                        }

                        ent->status = CLEANUP_DONE;
                }
        }

        return 0;
}

typedef struct {
        struct list_head hook;
        volid_t id;
} vol_seg_t;

static int __msg_iterator_func(void *key, void *arg)
{

        int ret;
        vol_seg_t *vol;
        struct list_head *list = arg;

        ret = ymalloc((void **)&vol, sizeof(vol_seg_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        str2chkid(&vol->id, key);

        list_add_tail(&vol->hook, list);

        return 0;
err_ret:
        return ret;
}

static int __replica_cleanup_discover__(void *_args) {
        int ret;
        vol_seg_t *vol;
        struct list_head list;
        struct list_head *pos, *n;

        (void) _args;
        const nid_t *nid = net_getnid();

        INIT_LIST_HEAD(&list);

        ret = offline_msg_iterator(nid, __msg_iterator_func, &list);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        list_for_each_safe(pos, n, &list) {
                vol = (vol_seg_t *)pos;
                list_del(pos);

                ret = replica_srv_cleanup(&vol->id, 1);
                if (likely(ret)) {
                        if (ret != EEXIST && ret != EAGAIN) {
                                DERROR(""CHKID_FORMAT" try to cleanup error %d\n", CHKID_ARG(&vol->id), ret);
                                GOTO(err_ret, ret);
                        } else {
                                //nothing todo
                        }
                } else {
                        /*finish*/
                        ret = offline_msg_rm(nid, &vol->id);
                        if (unlikely(ret)) {
                                //nothing todo
                                if (ret != ENOENT)
                                        DERROR(""CHKID_FORMAT" cleanup finish, but rm etcd ret %d\n", CHKID_ARG(&vol->id), ret);
                        }
                }

                yfree((void **)&vol);
        }

        return 0;
err_ret:
        return ret;
}

static int __replica_cleanup_discover(void *_args)
{
        int ret;

        __replica_cleanup_discover__(_args);

        ret = timer1_settime_retry(&desc_handler, CLEANUP_RECYCLE, 3);
        if (unlikely(ret)) {
                YASSERT(0);
        }

        return 0;
}

/**
 * 异步清理被删除卷的副本数据，相关任务记录在etcd上
 *
 * @todo 如果所在pool被删除，应如何处理？
 *
 * @see diskmd.diskmd_pool_cleanup
 *
 */
int replica_cleanup_init()
{
        int ret;

        DINFO("replica cleanup init\n");
        ret = sem_init(&__cleanup_bh__.sem, 0, 0);
        if (unlikely(ret < 0)) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_init(&__cleanup_bh__.lock, "replica_cleanup");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __cleanup_bh__.cleanup_table = hash_create_table(__cmp_func, __key_func, "replica_cleanup");
        if (__cleanup_bh__.cleanup_table == NULL) {
                ret = ENOMEM;
                GOTO(err_ret, ret);
        }

        sy_thread_create(__replica_cleanup_worker__, NULL);

        ret = timer1_create(&desc_handler, "descover", __replica_cleanup_discover, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = timer1_settime(&desc_handler, CLEANUP_RECYCLE);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}
